#!/usr/bin/env python3
# Copyright 2019-2023 Alibaba Group Holding Limited.
# SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause

import sys
import os
from math import log, ceil
import logging
from glob import glob
from sh import awk, rpm, lsmod, grep, dmesg, mount, rm, dd

import colorlog

handler = logging.StreamHandler()
handler.setFormatter(colorlog.ColoredFormatter(
    '%(cyan)s%(asctime)s%(reset)s %(log_color)s%(levelname)s %(white)s%(message)s%(reset)s',
    datefmt='%Y-%m-%d %H:%M:%S'))
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(handler)

class TestMemPressure:
    def setup_class(self):
        self.step = 0.4               # Leaving only (step^level)% of the whole memory each level
        self.min_mem = 50 * 1024      # The most extreme test case
        self.all_available = int(awk('/MemAvailable/{print $2}', '/proc/meminfo'))
        self.iterations = ceil(log(1.0 * self.min_mem / self.all_available, self.step))
        self.rpm = self.get_rpm()
        mount('-o', 'size=1P', '-o', 'remount', '/dev/shm')

    def teardown_class(self):
        rm('/dev/shm/pin', force=True)
        if grep(lsmod(), 'scheduler', word_regexp=True, _ok_code=[0,1]).exit_code == 0:
            rpm('-e', 'scheduler-xxx')

    def get_rpm(self):
        scheduler_rpm = glob(os.path.join('/tmp/work', 'scheduler*.rpm'))
        if len(scheduler_rpm) != 1:
            print("Please check your scheduler rpm");
            sys.exit(1)
        return scheduler_rpm

    def check_oom(self, pin_mem):
        exit_if_oom = '|'.join(['rpm', 'insmod'])
        if grep(dmesg(), '-P', 'Killed process [0-9]* \((%s)\)' % exit_if_oom, _ok_code=[0,1]).exit_code == 1:
            return
        logging.warning("Test exited early because oomed when pinning %d kbytes memory" % pin_mem)
        self.teardown_class()
        os._exit(0)

    def pin_memory(self, target):
        left = int(awk('/MemAvailable/{print $2}', '/proc/meminfo'))
        if left < target:
            return
        logging.info("Adjusting available memory from %dKB to %dKB" % (left, target))
        new_anonymous_obj = ceil((left - target) / 2048.0)
        dd('if=/dev/zero', 'of=/dev/shm/pin', 'bs=2M', 'count=%d' % new_anonymous_obj, 'oflag=append', 'conv=notrunc')
        self.check_oom(target)

    def test_level(self, level):
        dmesg(clear=True)
        curr = int(awk('/MemAvailable/{print $2}', '/proc/meminfo'))
        target = max(self.all_available * self.step ** level, self.min_mem)
        self.pin_memory(target)
        curr = int(awk('/MemAvailable/{print $2}', '/proc/meminfo'))
        logging.info("Installing rpm when available memory = %dKB" % curr)
        try:
            rpm('-ivh', self.rpm)
            grep(lsmod(), 'scheduler', word_regexp=True)
            rpm('-e', 'scheduler-xxx')
        except Exception:
            self.check_oom(target)
            raise

    def test_all(self):
        for level in range(1, self.iterations + 4):
            self.test_level(level)

if __name__ == '__main__':
    test_unit = TestMemPressure()
    test_unit.setup_class()
    test_unit.test_all()
    test_unit.teardown_class()
